sqlstore.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. package sqlstore
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "strings"
  10. "time"
  11. "github.com/go-sql-driver/mysql"
  12. "github.com/go-xorm/xorm"
  13. "github.com/grafana/grafana/pkg/bus"
  14. "github.com/grafana/grafana/pkg/infra/log"
  15. m "github.com/grafana/grafana/pkg/models"
  16. "github.com/grafana/grafana/pkg/registry"
  17. "github.com/grafana/grafana/pkg/services/annotations"
  18. "github.com/grafana/grafana/pkg/services/cache"
  19. "github.com/grafana/grafana/pkg/services/sqlstore/migrations"
  20. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
  21. "github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
  22. "github.com/grafana/grafana/pkg/setting"
  23. _ "github.com/grafana/grafana/pkg/tsdb/mssql"
  24. "github.com/grafana/grafana/pkg/util"
  25. _ "github.com/lib/pq"
  26. )
  27. var (
  28. x *xorm.Engine
  29. dialect migrator.Dialect
  30. sqlog log.Logger = log.New("sqlstore")
  31. )
  32. const ContextSessionName = "db-session"
  33. func init() {
  34. registry.Register(&registry.Descriptor{
  35. Name: "SqlStore",
  36. Instance: &SqlStore{},
  37. InitPriority: registry.High,
  38. })
  39. }
  40. type SqlStore struct {
  41. Cfg *setting.Cfg `inject:""`
  42. Bus bus.Bus `inject:""`
  43. CacheService *cache.CacheService `inject:""`
  44. dbCfg DatabaseConfig
  45. engine *xorm.Engine
  46. log log.Logger
  47. Dialect migrator.Dialect
  48. skipEnsureAdmin bool
  49. }
  50. func (ss *SqlStore) Init() error {
  51. ss.log = log.New("sqlstore")
  52. ss.readConfig()
  53. engine, err := ss.getEngine()
  54. if err != nil {
  55. return fmt.Errorf("Fail to connect to database: %v", err)
  56. }
  57. ss.engine = engine
  58. ss.Dialect = migrator.NewDialect(ss.engine)
  59. // temporarily still set global var
  60. x = engine
  61. dialect = ss.Dialect
  62. migrator := migrator.NewMigrator(x)
  63. migrations.AddMigrations(migrator)
  64. for _, descriptor := range registry.GetServices() {
  65. sc, ok := descriptor.Instance.(registry.DatabaseMigrator)
  66. if ok {
  67. sc.AddMigration(migrator)
  68. }
  69. }
  70. if err := migrator.Start(); err != nil {
  71. return fmt.Errorf("Migration failed err: %v", err)
  72. }
  73. // Init repo instances
  74. annotations.SetRepository(&SqlAnnotationRepo{})
  75. ss.Bus.SetTransactionManager(ss)
  76. // Register handlers
  77. ss.addUserQueryAndCommandHandlers()
  78. // ensure admin user
  79. if ss.skipEnsureAdmin {
  80. return nil
  81. }
  82. return ss.ensureAdminUser()
  83. }
  84. func (ss *SqlStore) ensureAdminUser() error {
  85. systemUserCountQuery := m.GetSystemUserCountStatsQuery{}
  86. err := ss.InTransaction(context.Background(), func(ctx context.Context) error {
  87. err := bus.DispatchCtx(ctx, &systemUserCountQuery)
  88. if err != nil {
  89. return fmt.Errorf("Could not determine if admin user exists: %v", err)
  90. }
  91. if systemUserCountQuery.Result.Count > 0 {
  92. return nil
  93. }
  94. cmd := m.CreateUserCommand{}
  95. cmd.Login = setting.AdminUser
  96. cmd.Email = setting.AdminUser + "@localhost"
  97. cmd.Password = setting.AdminPassword
  98. cmd.IsAdmin = true
  99. if err := bus.DispatchCtx(ctx, &cmd); err != nil {
  100. return fmt.Errorf("Failed to create admin user: %v", err)
  101. }
  102. ss.log.Info("Created default admin", "user", setting.AdminUser)
  103. return nil
  104. })
  105. return err
  106. }
  107. func (ss *SqlStore) buildExtraConnectionString(sep rune) string {
  108. if ss.dbCfg.UrlQueryParams == nil {
  109. return ""
  110. }
  111. var sb strings.Builder
  112. for key, values := range ss.dbCfg.UrlQueryParams {
  113. for _, value := range values {
  114. sb.WriteRune(sep)
  115. sb.WriteString(key)
  116. sb.WriteRune('=')
  117. sb.WriteString(value)
  118. }
  119. }
  120. return sb.String()
  121. }
  122. func (ss *SqlStore) buildConnectionString() (string, error) {
  123. cnnstr := ss.dbCfg.ConnectionString
  124. // special case used by integration tests
  125. if cnnstr != "" {
  126. return cnnstr, nil
  127. }
  128. switch ss.dbCfg.Type {
  129. case migrator.MYSQL:
  130. protocol := "tcp"
  131. if strings.HasPrefix(ss.dbCfg.Host, "/") {
  132. protocol = "unix"
  133. }
  134. cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
  135. ss.dbCfg.User, ss.dbCfg.Pwd, protocol, ss.dbCfg.Host, ss.dbCfg.Name)
  136. if ss.dbCfg.SslMode == "true" || ss.dbCfg.SslMode == "skip-verify" {
  137. tlsCert, err := makeCert(ss.dbCfg)
  138. if err != nil {
  139. return "", err
  140. }
  141. mysql.RegisterTLSConfig("custom", tlsCert)
  142. cnnstr += "&tls=custom"
  143. }
  144. cnnstr += ss.buildExtraConnectionString('&')
  145. case migrator.POSTGRES:
  146. host, port := util.SplitHostPortDefault(ss.dbCfg.Host, "127.0.0.1", "5432")
  147. if ss.dbCfg.Pwd == "" {
  148. ss.dbCfg.Pwd = "''"
  149. }
  150. if ss.dbCfg.User == "" {
  151. ss.dbCfg.User = "''"
  152. }
  153. cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", ss.dbCfg.User, ss.dbCfg.Pwd, host, port, ss.dbCfg.Name, ss.dbCfg.SslMode, ss.dbCfg.ClientCertPath, ss.dbCfg.ClientKeyPath, ss.dbCfg.CaCertPath)
  154. cnnstr += ss.buildExtraConnectionString(' ')
  155. case migrator.SQLITE:
  156. // special case for tests
  157. if !filepath.IsAbs(ss.dbCfg.Path) {
  158. ss.dbCfg.Path = filepath.Join(ss.Cfg.DataPath, ss.dbCfg.Path)
  159. }
  160. os.MkdirAll(path.Dir(ss.dbCfg.Path), os.ModePerm)
  161. cnnstr = fmt.Sprintf("file:%s?cache=%s&mode=rwc", ss.dbCfg.Path, ss.dbCfg.CacheMode)
  162. cnnstr += ss.buildExtraConnectionString('&')
  163. default:
  164. return "", fmt.Errorf("Unknown database type: %s", ss.dbCfg.Type)
  165. }
  166. return cnnstr, nil
  167. }
  168. func (ss *SqlStore) getEngine() (*xorm.Engine, error) {
  169. connectionString, err := ss.buildConnectionString()
  170. if err != nil {
  171. return nil, err
  172. }
  173. sqlog.Info("Connecting to DB", "dbtype", ss.dbCfg.Type)
  174. engine, err := xorm.NewEngine(ss.dbCfg.Type, connectionString)
  175. if err != nil {
  176. return nil, err
  177. }
  178. engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn)
  179. engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn)
  180. engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime))
  181. // configure sql logging
  182. debugSql := ss.Cfg.Raw.Section("database").Key("log_queries").MustBool(false)
  183. if !debugSql {
  184. engine.SetLogger(&xorm.DiscardLogger{})
  185. } else {
  186. engine.SetLogger(NewXormLogger(log.LvlInfo, log.New("sqlstore.xorm")))
  187. engine.ShowSQL(true)
  188. engine.ShowExecTime(true)
  189. }
  190. return engine, nil
  191. }
  192. func (ss *SqlStore) readConfig() {
  193. sec := ss.Cfg.Raw.Section("database")
  194. cfgURL := sec.Key("url").String()
  195. if len(cfgURL) != 0 {
  196. dbURL, _ := url.Parse(cfgURL)
  197. ss.dbCfg.Type = dbURL.Scheme
  198. ss.dbCfg.Host = dbURL.Host
  199. pathSplit := strings.Split(dbURL.Path, "/")
  200. if len(pathSplit) > 1 {
  201. ss.dbCfg.Name = pathSplit[1]
  202. }
  203. userInfo := dbURL.User
  204. if userInfo != nil {
  205. ss.dbCfg.User = userInfo.Username()
  206. ss.dbCfg.Pwd, _ = userInfo.Password()
  207. }
  208. ss.dbCfg.UrlQueryParams = dbURL.Query()
  209. } else {
  210. ss.dbCfg.Type = sec.Key("type").String()
  211. ss.dbCfg.Host = sec.Key("host").String()
  212. ss.dbCfg.Name = sec.Key("name").String()
  213. ss.dbCfg.User = sec.Key("user").String()
  214. ss.dbCfg.ConnectionString = sec.Key("connection_string").String()
  215. ss.dbCfg.Pwd = sec.Key("password").String()
  216. }
  217. ss.dbCfg.MaxOpenConn = sec.Key("max_open_conn").MustInt(0)
  218. ss.dbCfg.MaxIdleConn = sec.Key("max_idle_conn").MustInt(2)
  219. ss.dbCfg.ConnMaxLifetime = sec.Key("conn_max_lifetime").MustInt(14400)
  220. ss.dbCfg.SslMode = sec.Key("ssl_mode").String()
  221. ss.dbCfg.CaCertPath = sec.Key("ca_cert_path").String()
  222. ss.dbCfg.ClientKeyPath = sec.Key("client_key_path").String()
  223. ss.dbCfg.ClientCertPath = sec.Key("client_cert_path").String()
  224. ss.dbCfg.ServerCertName = sec.Key("server_cert_name").String()
  225. ss.dbCfg.Path = sec.Key("path").MustString("data/grafana.db")
  226. ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
  227. }
  228. // Interface of arguments for testing db
  229. type ITestDB interface {
  230. Helper()
  231. Fatalf(format string, args ...interface{})
  232. }
  233. // InitTestDB initiliaze test DB
  234. func InitTestDB(t ITestDB) *SqlStore {
  235. t.Helper()
  236. sqlstore := &SqlStore{}
  237. sqlstore.skipEnsureAdmin = true
  238. sqlstore.Bus = bus.New()
  239. sqlstore.CacheService = cache.New(5*time.Minute, 10*time.Minute)
  240. dbType := migrator.SQLITE
  241. // environment variable present for test db?
  242. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  243. dbType = db
  244. }
  245. // set test db config
  246. sqlstore.Cfg = setting.NewCfg()
  247. sec, _ := sqlstore.Cfg.Raw.NewSection("database")
  248. sec.NewKey("type", dbType)
  249. switch dbType {
  250. case "mysql":
  251. sec.NewKey("connection_string", sqlutil.TestDB_Mysql.ConnStr)
  252. case "postgres":
  253. sec.NewKey("connection_string", sqlutil.TestDB_Postgres.ConnStr)
  254. default:
  255. sec.NewKey("connection_string", sqlutil.TestDB_Sqlite3.ConnStr)
  256. }
  257. // need to get engine to clean db before we init
  258. engine, err := xorm.NewEngine(dbType, sec.Key("connection_string").String())
  259. if err != nil {
  260. t.Fatalf("Failed to init test database: %v", err)
  261. }
  262. sqlstore.Dialect = migrator.NewDialect(engine)
  263. // temp global var until we get rid of global vars
  264. dialect = sqlstore.Dialect
  265. if err := dialect.CleanDB(); err != nil {
  266. t.Fatalf("Failed to clean test db %v", err)
  267. }
  268. if err := sqlstore.Init(); err != nil {
  269. t.Fatalf("Failed to init test database: %v", err)
  270. }
  271. sqlstore.engine.DatabaseTZ = time.UTC
  272. sqlstore.engine.TZLocation = time.UTC
  273. return sqlstore
  274. }
  275. func IsTestDbMySql() bool {
  276. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  277. return db == migrator.MYSQL
  278. }
  279. return false
  280. }
  281. func IsTestDbPostgres() bool {
  282. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  283. return db == migrator.POSTGRES
  284. }
  285. return false
  286. }
  287. type DatabaseConfig struct {
  288. Type string
  289. Host string
  290. Name string
  291. User string
  292. Pwd string
  293. Path string
  294. SslMode string
  295. CaCertPath string
  296. ClientKeyPath string
  297. ClientCertPath string
  298. ServerCertName string
  299. ConnectionString string
  300. MaxOpenConn int
  301. MaxIdleConn int
  302. ConnMaxLifetime int
  303. CacheMode string
  304. UrlQueryParams map[string][]string
  305. }